package kafka

import (
	"context"
	log "gitee.com/vrv_media/go-micro-framework/pkg/logger"
	"github.com/segmentio/kafka-go"
	"time"
)

type Producer struct {
	logger   log.LoggerHelper
	Addr     string
	Topic    string
	producer *kafka.Writer
}

func NewKafkaProducerClient(addr, topic string) *Producer {
	producerClient := &Producer{
		logger: log.NewNameLogWithDefaultLog("kafka-producer"),
		Addr:   addr,
		Topic:  topic,
	}
	producerClient.producer = &kafka.Writer{
		Addr:                   kafka.TCP(addr), //TCP函数参数为不定长参数，可以传多个地址组成集群
		Topic:                  topic,
		Balancer:               &kafka.Hash{}, // 用于对key进行hash，决定消息发送到哪个分区
		MaxAttempts:            0,
		WriteBackoffMin:        0,
		WriteBackoffMax:        0,
		BatchSize:              0,
		BatchBytes:             0,
		BatchTimeout:           0,
		ReadTimeout:            0,
		WriteTimeout:           time.Second,       // kafka有时候可能负载很高，写不进去，那么超时后可以放弃写入，用于可以丢消息的场景
		RequiredAcks:           kafka.RequireNone, // 不需要任何节点确认就返回
		Async:                  false,
		Completion:             nil,
		Compression:            0,
		Logger:                 nil,
		ErrorLogger:            nil,
		Transport:              nil,
		AllowAutoTopicCreation: false, // 第一次发消息的时候，如果topic不存在，就自动创建topic，工作中禁止使用
	}
	return producerClient
}

func (p *Producer) SendMessage(ctx context.Context, message string) error {
	msg := kafka.Message{
		Topic:         "",
		Partition:     0,
		Offset:        0,
		HighWaterMark: 0,
		Value:         []byte(message),
		Headers:       nil,
		WriterData:    nil,
		Time:          time.Time{},
	}
	err := p.producer.WriteMessages(ctx, msg)
	if err != nil {
		p.logger.ErrorF("kafka send message error[%s]", err.Error())
	}
	return err
}
